package com.example.demo.kafkaDemo;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

//@Configuration
//@EnableKafka
public class DemoKafkaProducerConfig {

    public static KafkaTemplate<String, Object> kafkaTemplate;


    private String servers = "172.20.52.22:9505";

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> stringObjectKafkaTemplate = new KafkaTemplate<>(producerTemplateFactory());
        kafkaTemplate = stringObjectKafkaTemplate;
        return stringObjectKafkaTemplate;
    }

    private ProducerFactory<String, Object> producerTemplateFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(8);
        // kafka 服务器地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        // 消息序列化类型
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put("compression.type", "none");


        // 认证信息校验相关
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"sailing\" password=\"Sailing@2020\"");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        return props;
    }
}
